Recommendation EngineΒΆ

Problem Statement
The input data contains a file with user, item and ratings.
The purpose of the exercise is to build a recommendation model
and then predict the affinity for users to various items
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# Load the data file in ALS format (user, item, rating)
ratingsData = sc.textFile("UserItemData.txt")
ratingsData.collect()
# Affinity of User to Item: 'User, Item, Score' [10- highest]
# ['1001,9001,10',
#  '1001,9002,1',
#  '1001,9003,9',
#  '1002,9001,3',
# . . .

# =====================================
# Convert the strings into a proper vector (RDD)
ratingVector = ratingsData.map(lambda l: l.split(','))\
        .map(lambda l: (int(l[0]), int(l[1]), float(l[2])))

# =====================================
# Build a SQL Dataframe
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

ratingsDf = sqlContext.createDataFrame(ratingVector,
                                       ["user","item","rating"])
# DataFrame[user: bigint, item: bigint, rating: double]

# =====================================
# Build the model based on ALS
# ALS - Automate List Model
from pyspark.ml.recommendation import ALS

als = ALS(rank=10, maxIter=5)
# Affinity rating between users and items
model = als.fit(ratingsDf)

model.userFactors.orderBy("id").collect()
# [Row(id=1001, features=[-0.45539024472236633,
#                          0.36960071325302124,
#                          0.851772665977478,
#                          0.6629341840744019,
#                          0.7355863451957703,
#                          0.6816525459289551,
#                          0.7078778743743896,
#                         -0.37058207392692566,
#                         -0.12031808495521545,
#                         -0.196274533867836]),
# . . .

# =====================================
# Create a Test DataSet of users and items you want ratings for
# What is the affinity betweeen user and item?
# =====================================
testDf = sqlContext.createDataFrame(
        [(1001, 9003),
         (1001, 9004),
         (1001, 9005)],
        ["user", "item"])

# =====================================
# Predict
# =====================================
predictions = sorted(model.transform(testDf).collect(),
                   key = lambda r: r[2], reverse=True)

predictions
# [Row(user=1001, item=9003, prediction=9.006633758544922),
# [Row(user=1001, item=9004, prediction=-0.6739926934242249),
# Row(user=1001, item=9005, prediction=-2.546449899673462)]